from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import *
from pyspark import find_spark_home

spark = SparkSession \
    .builder \
    .master('local') \
    .appName('HelloSpark') \
    .getOrCreate()

df_user = spark.read.option('multline', True).json('dataset/yelp_academic_dataset_user.json')

# explode函数
df_user = df_user.withColumn('friend',explode(split(col('friends'),",")))

# 表命名
# 选择user_id == 'q_QQ5kBBwlCcbL1s4NVK3g'的用户
df_user_main = df_user.select('user_id', 'friend') \
    .where("user_id == 'q_QQ5kBBwlCcbL1s4NVK3g'")
df_friend = df_user.select(col('user_id').alias('friend1_id'), col('friend').alias('friend2_id'))

# 连接df_user_main和df_friend表
df_user_friend = df_user_main.join(df_friend, df_user_main['friend'] == df_friend['friend1_id'], 'left_outer') \
    .where(col('friend2_id').isNotNull())

# 推荐用户热度排序
df_user_friend_now = df_user_friend \
    .groupBy('friend2_id') \
    .agg(count('friend2_id').alias('count')) \
    .orderBy(col('count').desc())

print('推荐用户id')
df_user_friend_now.select('friend2_id').show(5)












